package org.zstacks.znet.codec;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.zstacks.znet.log.Logger;
import org.zstacks.znet.nio.Dispatcher;
import org.zstacks.znet.nio.Session;


public class StringClient extends StringAdaptor implements Closeable {
	private static final Logger log = Logger.getLogger(StringClient.class);
	
	private final Dispatcher dispatcher;
	private Session session;
	protected final String brokerAddress;
	private String host = "127.0.0.1";
	private int port = 15555;
	 
	private int connectTimeout = 3000;  
	private CountDownLatch latch = new CountDownLatch(1);
	
	public StringClient(String address, Dispatcher dispatcher){
		this.brokerAddress = address;
		this.dispatcher = dispatcher;
		String[] blocks = address.split("[:]");
		if(blocks.length > 2){
			throw new IllegalArgumentException("Illegal address: "+address);
		}
		String host = blocks[0].trim();
		int port = 15555;
		if(blocks.length > 1){
			port = Integer.valueOf(blocks[1].trim());
		}  
		initialize(host, port, dispatcher);
	}
	
	
	public StringClient(String host, int port, Dispatcher dispatcher){
		this.brokerAddress = String.format("%s:%d", host, port);
		this.dispatcher = dispatcher;
		initialize(host, port, dispatcher);
	}
	
	
	private void initialize(String host, int port, Dispatcher dispatcher){
		this.host = host;
		this.port = port; 
		if(!dispatcher.isStarted()){
			dispatcher.start();
		} 
	}
	
	
	protected void doConnect() throws IOException { 
    	if(this.session != null ){
    		if (this.session.isActive() || this.session.isNew()){  
    			return;
    		}
    	} 
    	this.session = dispatcher.registerClientChannel(host, port, this);
	}
	
	public void connect(int timeoutMillis) throws IOException{  
    	doConnect(); 
    	this.session.waitToConnect(timeoutMillis);
    }
    
    public boolean hasConnected(){
    	return session != null && session.isActive();
    }
  
    public void ensureConnected(){ 
		while(!this.hasConnected()){
			try {
				this.connect(connectTimeout);
			} catch (IOException e) {
				log.info(e.getMessage(), e);
			}
		}
	}  
    
    public void connectIfNeed() throws IOException{
    	if(!this.hasConnected()){
    		//同步进行连接操作
    		this.connect(this.connectTimeout);
    	}
    }  
	
	@Override
	public void close() throws IOException {
		if(this.session != null){
			this.session.close();
		} 
	} 

    public void invokeAsync(String req) throws IOException { 
    	connectIfNeed();
    	latch = new CountDownLatch(1);
    	session.write(req); 
    	try {
			latch.await();
		} catch (InterruptedException e) { 
			e.printStackTrace();
		}
    	
	} 
 
	
	@Override
    public void onMessage(Object obj, Session sess) throws IOException {   
		latch.countDown();
	} 
	
	public static void main(String[] args) throws Exception { 
		
		//1)创建Dispatcher
		final Dispatcher dispatcher = new Dispatcher(); 
		//2)建立链接
		final StringClient client = new StringClient("127.0.0.1:80", dispatcher);
		long start = System.currentTimeMillis();
		int N = 100000;
		for(int i=0;i<N;i++){
			client.invokeAsync("test");
		}
		long end = System.currentTimeMillis();
		
		client.close();
		dispatcher.close();
		System.out.println(N*1000.0/(end-start));
		
	}
}
